Add audio JSONL reader and tarred ASR dataset support#1780
Add audio JSONL reader and tarred ASR dataset support#1780ssh-meister wants to merge 27 commits intomainfrom
Conversation
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Greptile SummaryThis PR introduces audio-focused JSONL reading ( All previously flagged P0/P1 issues have been addressed in this revision: Confidence Score: 5/5Safe to merge; all previously flagged P0/P1 issues have been resolved and remaining findings are P2 suggestions. All eight previously flagged issues across multiple review rounds have been addressed — including the nemo_curator/stages/audio/io/materialize.py (sub-sample duration rounding), nemo_curator/stages/file_io/files.py (temp-file cleanup tracking) Important Files Changed
Sequence DiagramsequenceDiagram
participant Pipeline
participant TarredAudioManifestReader
participant TarredAudioManifestPartitionStage
participant TarredAudioManifestReaderStage
participant MaterializeTarredAudioStage
participant CleanupTemporaryAudioStage
Pipeline->>TarredAudioManifestReader: decompose()
TarredAudioManifestReader-->>TarredAudioManifestPartitionStage: expand sharded manifest paths
TarredAudioManifestPartitionStage-->>TarredAudioManifestReaderStage: FileGroupTask (manifest shard list)
TarredAudioManifestReaderStage->>TarredAudioManifestReaderStage: match shard_id to tar_path
TarredAudioManifestReaderStage->>TarredAudioManifestReaderStage: build_audio_sample_key per entry
TarredAudioManifestReaderStage-->>Pipeline: list[AudioTask] (with _tar_path, _tar_member, sample_key)
Pipeline->>MaterializeTarredAudioStage: process_batch(tasks)
MaterializeTarredAudioStage->>MaterializeTarredAudioStage: group by (tar_path, tar_member)
MaterializeTarredAudioStage->>MaterializeTarredAudioStage: open_binary_stream -> tarfile streaming
MaterializeTarredAudioStage->>MaterializeTarredAudioStage: _should_segment? -> write bytes or soundfile segment
MaterializeTarredAudioStage-->>Pipeline: tasks with audio_filepath -> local temp (or durable) path
Pipeline->>CleanupTemporaryAudioStage: process(task)
CleanupTemporaryAudioStage->>CleanupTemporaryAudioStage: unlink _temporary_audio_path
CleanupTemporaryAudioStage->>CleanupTemporaryAudioStage: restore audio_filepath from _manifest_audio_filepath
CleanupTemporaryAudioStage-->>Pipeline: AudioTask (manifest path restored)
Reviews (17): Last reviewed commit: "sample_key declaration" | Re-trigger Greptile |
Signed-off-by: Sasha Meister <ameister@nvidia.com>
|
Tip: Greploop — Automatically fix all review issues by running Use the Greptile plugin for Claude Code to query reviews, search comments, and manage custom context directly from your terminal. |
|
@ssh-meister plz fix linter and cherry pick this to dev branch with #1780 hashtag |
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
…plain manifest support, and keep tarred audio materialization on a shared core. Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
Signed-off-by: Sasha Meister <ameister@nvidia.com>
|
|
||
|
|
||
| @dataclass | ||
| class AudioManifestReader(CompositeStage[_EmptyTask, AudioTask]): |
There was a problem hiding this comment.
Can we use the existing manifest reader?
|
|
||
| @dataclass | ||
| class JsonlReader(CompositeStage[_EmptyTask, DocumentBatch]): | ||
| class JsonlAudioReaderStage(ProcessingStage[FileGroupTask, AudioTask]): |
There was a problem hiding this comment.
It seems confusing to have this under stages/text. It should be under audio right?
|
|
||
| def outputs(self) -> tuple[list[str], list[str]]: | ||
| output_fields = list(self.fields or []) | ||
| if self._generate_ids or self._assign_ids: |
There was a problem hiding this comment.
Is there a use case for generate/assign IDs here?
| _generate_ids: bool = False | ||
| _assign_ids: bool = False |
There was a problem hiding this comment.
Are these used for some audio pipelines, or are they just there to match the text version?
| if self._generate_ids or self._assign_ids: | ||
| from nemo_curator.stages.deduplication.id_generator import get_id_generator_actor | ||
|
|
||
| try: | ||
| self.id_generator = get_id_generator_actor() | ||
| except ValueError: | ||
| msg = ( | ||
| "ID generator is required when self._generate_ids or self._assign_ids is True, " | ||
| "and the actor 'id_generator' does not exist. Please start the id_generator actor." | ||
| ) | ||
| raise RuntimeError(msg) from None |
| assert result[0].data.iloc[0]["text"] == "hi" | ||
|
|
||
|
|
||
| def test_process_batch_serializes_constructor_sample_key() -> None: |
There was a problem hiding this comment.
We don't need this test IMO.
There was a problem hiding this comment.
Is there a cleanup to remove any files created by these tests?
There was a problem hiding this comment.
No copyright needed on an empty file.
There was a problem hiding this comment.
Yeah it's a bit confusing that audio tests are in the text directory.
Description
Adds audio-focused JSONL reading, plain audio manifest reading, tarred ASR dataset support, and generic file-transfer stages to NeMo Curator.
This PR extends
JsonlReaderwithtask_type="audio"so JSONL manifests can now emitAudioTaskobjects directly instead of onlyDocumentBatch. The new audio reader path supports one-manifest-line-per-task fanout and also preserves stable_curator_dedup_idassignment via_generate_ids/_assign_ids.In addition, this PR adds a plain audio manifest reader for JSONL manifests that already contain real file paths or URIs (for example local paths or
s3://...URIs):AudioManifestReaderreads JSONL manifests intoAudioTasks while preserving the originalaudio_filepathsample_keyvalues for downstream processing and checkpoint-friendly behaviorThis PR also introduces a bridge for NeMo-style tarred audio datasets:
TarredAudioManifestReaderreads sharded manifests and matches them to tar shards by shard idMaterializeTarredAudioStageextracts only the needed tar member into a local file just before path-based audio stagesCleanupTemporaryAudioStageremoves temporary files afterwards and restores the original manifest-styleaudio_filepathThis avoids eager extraction of the whole tar dataset, keeps compatibility with existing path-based ASR stages, and supports both strict and permissive handling of manifest entries missing from tar shards via
skip_missing_entries.TarredAudioManifestReaderalso follows the transport ideas used in NeMo's nemo_adapters.py, including support forfsspec-based remote access andpipe:-style specifiers, so the same reader can work with local files, S3-backed storage, and AIS-style commands such aspipe:ais get ....To support remote file workflows beyond tarred datasets, this PR also adds generic file stages under
nemo_curator.stages.file_io:MaterializeFilesStageUploadFilesStageDeleteFilesStageUploadManifestStageThese stages are designed for
dict-backed tasks and operate on configurable field paths, including nested paths such asartifacts.local.audio_path. If a configured field path resolves to a string path or URI, the stage can materialize, upload, or delete the corresponding file. This makes the functionality reusable outside audio-specific code while still fitting naturally into audio pipelines.UploadManifestStageoperates onFileGroupTaskoutputs from writer stages, so it can be chained afterJsonlWriter,ParquetWriter, ALM manifest writing, or other writers that emit file groups.Checkpoint / resume building blocks
This PR also adds the first building blocks for audio-stage checkpointing and resume.
A stable
sample_keyhas been introduced forAudioTaskas a per-sample identity that is independent of runtime-specifictask_idvalues. This key is now populated by both audio reader paths:JsonlReader(task_type="audio")/JsonlAudioReaderStageTarredAudioManifestReaderStageAudioManifestReaderIf an input manifest entry already contains
sample_key, it is preserved as-is. Otherwise, a deterministic key is derived from stable sample identity fields such asaudio_filepath, tar shard/member metadata,offset,duration, anddataset_name.In addition,
MaterializeTarredAudioStagesupports optional durable materialization via:materialization_dir: str | None = NoneBehavior is:
materialization_dir is None, the stage writes extracted audio to temporary local filesmaterialization_diris set, the stage writes extracted or segmented audio to a deterministic durable path derived fromsample_keyThis makes the tarred-audio bridge more checkpoint/resume-friendly without changing the default behavior for existing pipelines.
CleanupTemporaryAudioStagecontinues to clean up true temporary files, while durable materialized files are left intact andaudio_filepathis still restored back to the original manifest-style path after cleanup.These changes do not yet implement full pipeline-level checkpoint orchestration, but they establish two core primitives needed for that next step:
sample_key)materialization_dir)Pipeline Examples
1. Tarred ASR pipeline
2. Plain remote manifest -> local materialization -> ASR -> upload result manifest
3. Upload files from a nested field and then delete local copies
4. Upload files using a nested key field for destination naming
5. Durable tarred materialization for checkpoint-friendly runs